查看原文
其他

还在用传统 @Async 注解?试试这个升级版本吧,让异步任务无懈可击!

macrozheng 2023-12-17

The following article is from geekhalo Author geekhalo

1. 概览

Spring 的 @Async 注解,想必大家都非常熟悉,只需在方法上增加 @Aysnc ,便可以将其转化为异步操作,任务在后台线程池中运行。

由于数据存储于内存,服务重启存在任务丢失问题,所以,只适用于要求不太严谨的业务,对于要求严格的场景,只能另选方案。

1.1. 背景

在日常开发过程中,像记录日志这种非核心业务,才允许使用 Spring 的 Async 进行异步化,其他场景需要使用更加完备的 MQ 方案。

MQ 方案

面对这种场景,免不了一顿编码、一通测试,咱们的时间就这样没有了。对于这种纯纯的技术需求,封装框架是投入产出比最高的事。

1.2. 目标

期望框架能够提供:

  1. 不需要 Coding,直接将一个方法转变为 MQ 的异步处理;
  2. 支持 顺序消息 特性,以处理对顺序有依赖的场景;
  3. 发送,消费可以分离,能够在不同的集群中完成,以更好的支持资源隔离;

2. 快速入门

框架基于 RocketMQ 进行构建,请自行完成 RocketMQ 的搭建。

2.1. 引入 RocketMQ

我们使用 rocketmq starter 完成基本配置。

首先,在 pom 中增加 rocketmq starter 依赖,具体如下:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

其次,在 application.yml 中添加 rocketmq 配置,具体如下:

rocketmq:
  name-server: http://127.0.0.1:9876
  producer:
    group: async-demo

其中,name-server 根据具体情况进行配置。

配置完成,可以在项目中:

  1. 注入 RocketMQTemplate 进行消息发送;
  2. 使用 @RocketMQMessageListener 标记处理方法,进行消息消费;

2.2. 添加 lego-starter 依赖

为了方便与 spring-boot 项目集成,lego 提供 lego-starter,以完成快速接入。

在 pom 中增加 starter,具体如下:

<dependency>
    <groupId>com.geekhalo.lego</groupId>
    <artifactId>lego-starter</artifactId>
    <version>0.1.4-async_based_rocketmq-SNAPSHOT</version>
</dependency>

其中,自动配置机制将完成:

  1. 为 @AsyncBasedRocketMQ 注解方法,增加消息拦截,并启动 并行消费者 进行消息消费;
  2. 为 @AsyncForOrderedBasedRocketMQ 注解方法,增加消息拦截,并启动 顺序消费者进行消息消费;

2.3. 并行消息异步处理

我们只需在方法上添加 @AsyncBasedRocketMQ 注解,完成基础配置,该方法便具有异步处理能力。具体如下:

@AsyncBasedRocketMQ(topic = "${async.test.normal.topic}",
        tag = "asyncTest1",
        consumerGroup = "${async.test.normal.group1}")
public void asyncTest1(Long id, String name, AsyncInputBean bean){
    log.info("receive data id {}, name {}, bean", id, name, bean);

    CallData callData = new CallData(id, name, bean);
    this.callDatas.add(callData);
}

@AsyncBasedRocketMQ 定义如下:

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncBasedRocketMQ {
    /**
     * MQ topic
     * @return
     */

    String topic();

    /**
     * MQ tag
     * @return
     */

    String tag();

    /**
     * 消费组
     * @return
     */

    String consumerGroup();

    /**
     * nameServer 配置
     * @return
     */

    String nameServer() default "${rocketmq.name-server:}";

    /**
     * 消费者运行的 profile,主要用于发送和消费分离的场景
     * @return
     */
    String consumerProfile() default "
";
}

在 application 文件中增加相关配置,具体如下:

async:
  test:
    normal:
      topic: normal-async-test-topic
      group1: normal-async-test-group1
      group2: normal-async-test-group2

写一个简单的单测,代码如下:

@Test
public void asyncTest1() throws InterruptedException {

    asyncService.getCallDatas().clear();;

    Long id = RandomUtils.nextLong();
    String name = String.valueOf(RandomUtils.nextLong());
    AsyncInputBean bean = createAsyncInputBean();
    asyncService.asyncTest1(id, name, bean);

    {
        List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
        Assertions.assertTrue(CollectionUtils.isEmpty(callDatas));
    }

    TimeUnit.SECONDS.sleep(2);

    {
        List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
        Assertions.assertFalse(CollectionUtils.isEmpty(callDatas));

        AsyncService.CallData callData = callDatas.get(0);
        Assertions.assertEquals(id, callData.getId());
        Assertions.assertEquals(name, callData.getName());
        Assertions.assertEquals(bean, callData.getBean());
    }
}

运行单测,日志如下:

[           main] c.g.l.c.a.normal.NormalAsyncInterceptor  : After serialize, data is xxxxx
[           main] c.g.l.c.a.normal.NormalAsyncInterceptor  : success to send async Task to RocketMQ, args is xxxx, msg is yyyy, result is zzz
[MessageThread_1] com.geekhalo.lego.async.AsyncService     : receive data id 8926281443373242368, name 1130519434586076160, bean
[MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxxx, cost: 31 ms

为了方便,对部分日志进行简化,但不影响分析结果。

从运行日志可以得出:

  1. NormalAsyncInterceptor 运行在主线程中,主要完成:

  • 对参数进行序列化
  • 将序列化结果发送至 Rocketmq
  • 业务服务AsyncService 运行在消费线程,主要完成:

    • 调用业务方法
    • 打印消费信息

    2.4. 顺序消息异步处理

    RocketMQ 支持顺序消息,通过指定 hashKey 可以保障相同 hashKey的 Message 路由到同一线程,以模拟顺序消费。

    如果需要使用顺序消息,只需使用 @AsyncForOrderedBasedRocketMQ 即可,具体如下:

    @AsyncForOrderedBasedRocketMQ(topic = "${async.test.order.topic}",
            tag = "asyncTest1",
            shardingKey = "#id",
            consumerGroup = "${async.test.order.group1}")
    public void asyncTestForOrder1(Long id, String name, AsyncInputBean bean){
        log.info("receive data id {}, name {}, bean {}", id, name, bean);

        CallData callData = new CallData(id, name, bean);
        this.callDatas.add(callData);
    }

    其中,shardingKey = "#id" 含义为,将参数 id 的值作为 shardingKey。

    与 AsyncBasedRocketMQ 相比,核心配置不变,只增加 shardingKey 配置,具体定义如下:

    @Target({ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    public @interface AsyncForOrderedBasedRocketMQ {
        /**
         * MQ topic
         * @return
         */

        String topic();

        /**
         * MQ tag
         * @return
         */

        String tag();

        /**
         * 顺序消费设置的 hashKey
         * @return
         */

        String shardingKey();

        /**
         * 消费组
         * @return
         */

        String consumerGroup();

        /**
         * nameServer 配置
         * @return
         */

        String nameServer() default "${rocketmq.name-server:}";

        /**
         * 消费者运行的 profile,主要用于发送和消费分离的场景
         * @return
         */
        String consumerProfile() default "
    ";
    }

    在 application.yml 增加相关配置,具体如下:

    async:
      test:
        order:
          topic: order-async-test-topic
          group1: order-async-test-group1
          group2: order-async-test-group2

    编写单元测试用例,具体如下:

    @Test
    public void asyncForOrderTest1() throws InterruptedException {

        List<InputData> inputDatas = new ArrayList<>();
        Long[] ids = new Long[]{RandomUtils.nextLong(), RandomUtils.nextLong(), RandomUtils.nextLong(), RandomUtils.nextLong()};
        String name = String.valueOf(RandomUtils.nextLong());
        AsyncInputBean bean = createAsyncInputBean();

        asyncService.getCallDatas().clear();
        asyncService.asyncTestForOrder1(ids[0], name, bean);
        inputDatas.add(new InputData(ids[0], name, bean));

        {
            List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
            Assertions.assertTrue(CollectionUtils.isEmpty(callDatas));
        }


        for (int i = 0; i< 100; i++) {
            name = String.valueOf(RandomUtils.nextLong());
            bean = createAsyncInputBean();
            asyncService.asyncTestForOrder1(ids[i%ids.length], name, bean);
            inputDatas.add(new InputData(ids[i%ids.length], name, bean));
        }



        TimeUnit.SECONDS.sleep(10);

        {
            List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
            Assertions.assertFalse(CollectionUtils.isEmpty(callDatas));

            Assertions.assertEquals(inputDatas.size(), callDatas.size());

            Map<Long, List<AsyncService.CallData>> callDataMap = callDatas.stream().collect(Collectors.groupingBy(AsyncService.CallData::getId));
            Map<Long, List<InputData>> inputDataMap = inputDatas.stream().collect(Collectors.groupingBy(InputData::getId));

            for (Long id : ids){
                List<AsyncService.CallData> callDataToCheck = callDataMap.get(id);
                List<InputData> inputDataToCheck = inputDataMap.get(id);

                Assertions.assertEquals(callDataToCheck.size(), inputDataToCheck.size());

                for (int j = 0; j < callDataToCheck.size(); j++) {
                    AsyncService.CallData callData = callDataToCheck.get(j);
                    InputData inputData1 = inputDataToCheck.get(j);

                    Assertions.assertEquals(inputData1.getId(), callData.getId());
                    Assertions.assertEquals(inputData1.getName(), callData.getName());
                    Assertions.assertEquals(inputData1.getBean(), callData.getBean());
                }
            }
        }

    }

    运行测试用例,观察日志如下:

    [           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : After serialize, data is xxx
    [           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 6723772904149174272, msg is yyy, result is zzz
    [           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : After serialize, data xxx
    [           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 6723772904149174272, msg is yyy, result is zzz
    [           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : After serialize, data is xxx
    [MessageThread_1] com.geekhalo.lego.async.AsyncService     : receive data id 6723772904149174272, name 8410395540617317376, bean AsyncInputBean(id=81325280405335040, name=1950309494, age=976367396)
    [MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxx, cost: 1 ms
    [           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 8896761273036908544, msg is zzz
    [           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : After serialize, data is xxx
    [MessageThread_1] com.geekhalo.lego.async.AsyncService     : receive data id 6723772904149174272, name 693725382660268032, bean AsyncInputBean(id=1379620281334973440, name=1090615484, age=1421031650)
    [MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt zzz, cost: 0 ms
    [MessageThread_2] com.geekhalo.lego.async.AsyncService     : receive data id 8896761273036908544, name 7307088811299682304, bean AsyncInputBean(id=594404553604282368, name=812325506, age=1784532908)
    [           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : success to send orderly async Task to RocketMQ, args is xxx, shardingKey is 717741895048495104, msg is zzz
    [MessageThread_2] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxx, cost: 0 ms

    从日志上可见:

    1. 主线程 和 消费线程 交叉输出日志;
    2. 拦截器 OrderedAsyncInterceptor 基于 sharding key 向 RocketMQ 发送顺序消息;
    3. AbstractAsyncConsumerContainer 对顺序消息进行消费;

    2.5. 发送和消费分离

    有时为了更好的对资源进行隔离,会单独部署一组集群,用于处理后台任务。

    为支持该模式,AsyncBasedRocketMQ 和 AsyncForOrderedBasedRocketMQ 都提供了 consumerProfile 配置,用于指定 Consumer 在哪个 profile 下执行,如果不设置,则对环境不进行任何要求。

    3.设计&扩展

    3.1. 核心设计

    整体架构

    在方法上添加注解后,框架自动完成:

    1. 增加 AsyncInterceptor Bean,用于对方法进行拦截;
    2. 启动 MQConsumer 监听消息变更,并调用 业务方法;

    3.2. 核心流程

    核心流程如下:

    1. 方法被调用,被 AsyncInterceptor 拦截;

    • 首先,对调用参数进行序列化;
    • 然后,将信息封装为 Message
    • 最后,将Message发送至 RocketMQ
  • 消息在RocketMQ进行存储,并投放至 Consumer;

  • MQPushConsumer,监听消息,并完成业务操作;

    • Consumer 获得 Message 信息
    • 将消息进行反序列化,获得调用参数
    • 使用调用参数调用业务方法

    4. 项目信息

    项目仓库地址:https://gitee.com/litao851025/lego

    项目文档地址:https://gitee.com/litao851025/lego/wikis/support/asyncBasedRocketMQ


    微信8.0将好友放开到了一万,小伙伴可以加我大号了,先到先得,再满就真没了

    扫描下方二维码即可加我微信啦,2022,抱团取暖,一起牛逼。

    推荐阅读


    继续滑动看下一个

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存